package com.bigfans.searchservice.listener;

import com.bigfans.Constants;
import com.bigfans.framework.es.ElasticTemplate;
import com.bigfans.framework.es.IndexDocument;
import com.bigfans.framework.es.request.BulkInsertCriteria;
import com.bigfans.framework.es.request.CreateDocumentCriteria;
import com.bigfans.framework.utils.CollectionUtils;
import com.bigfans.model.event.ProductCreatedEvent;
import com.bigfans.framework.kafka.KafkaConsumerBean;
import com.bigfans.framework.kafka.KafkaListener;
import com.bigfans.searchservice.api.clients.CatalogServiceClient;
import com.bigfans.searchservice.model.Product;
import com.bigfans.searchservice.model.ProductAttribute;
import com.bigfans.searchservice.model.ProductSpec;
import com.bigfans.searchservice.model.Tag;
import com.bigfans.searchservice.schema.mapping.ProductMapping;
import com.bigfans.searchservice.schema.mapping.TagMapping;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;

/**
 * @author lichong
 * @create 2018-02-10 下午2:03
 **/
@KafkaConsumerBean
@Component
public class ProductListener {

    @Autowired
    private CatalogServiceClient catalogServiceClient;
    @Autowired
    private ElasticTemplate elasticTemplate;

    @KafkaListener
    public void on(ProductCreatedEvent event) throws Exception {
        CompletableFuture<Product> productFuture = catalogServiceClient.getProduct(event.getId());
        CompletableFuture<List<ProductAttribute>> attributesFuture = catalogServiceClient.getAttributesByProductId(event.getId());
        CompletableFuture<List<ProductSpec>> specsFuture = catalogServiceClient.getSpecsByProductId(event.getId());
        CompletableFuture<List<Tag>> tagsFuture = catalogServiceClient.getTagsByProductId(event.getId());

        CompletableFuture<Void> allResult = CompletableFuture.allOf(productFuture, attributesFuture, specsFuture, tagsFuture);
        allResult.join();
        Product product = productFuture.get();
        IndexDocument doc = new IndexDocument(product.getId());
        doc.put(ProductMapping.FIELD_ID, product.getId());
        doc.put(ProductMapping.FIELD_ORIGIN, product.getOrigin());
        doc.put(ProductMapping.FIELD_BRAND, product.getBrandName());
        doc.put(ProductMapping.FIELD_BRAND_ID, product.getBrandId());
        doc.put(ProductMapping.FIELD_NAME, product.getName());
        doc.put(ProductMapping.FIELD_IMAGE_PATH, product.getImagePath());
        doc.put(ProductMapping.FIELD_PRICE, product.getPrice());
        doc.put(ProductMapping.FIELD_CATEGORY_ID, product.getCategoryId());
        doc.put(ProductMapping.FIELD_CATEGORY, product.getCategoryName());

        // 规格
        List<ProductSpec> specList = specsFuture.get();
        if (CollectionUtils.isNotEmpty(specList)) {
            List<Map<String, Object>> specData = new ArrayList<>();
            for (ProductSpec ps : specList) {
                Map<String, Object> specMap = new HashMap<>();
                specMap.put(ProductMapping.FIELD_SPEC_OPTID, ps.getOptionId());
                specMap.put(ProductMapping.FIELD_SPEC_OPTNAME, ps.getOption());
                specMap.put(ProductMapping.FIELD_SPEC_VALID, ps.getValueId());
                specMap.put(ProductMapping.FIELD_SPEC_VALNAME, ps.getValue());
                specData.add(specMap);
            }
            doc.put(ProductMapping.FIELD_SPECS, specData);
        }

        //  属性字段
        List<ProductAttribute> attrValueList = attributesFuture.get();
        if (CollectionUtils.isNotEmpty(attrValueList)) {
            List<Map<String, Object>> attrData = new ArrayList<>();
            for (ProductAttribute avv : attrValueList) {
                Map<String, Object> attrMap = new HashMap<>();
                attrMap.put(ProductMapping.FIELD_ATTRIBUTE_OPTID, avv.getOptionId());
                attrMap.put(ProductMapping.FIELD_ATTRIBUTE_OPTNAME, avv.getOptionName());
                attrMap.put(ProductMapping.FIELD_ATTRIBUTE_VALID, avv.getValueId());
                attrMap.put(ProductMapping.FIELD_ATTRIBUTE_VALNAME, avv.getValue());
                attrData.add(attrMap);
            }
            doc.put(ProductMapping.FIELD_ATTRIBUTES, attrData);
        }
        // 标签
        List<Tag> tags = tagsFuture.get();
        List<Map<String, Object>> tagDatas = new ArrayList<>();
        for (Tag t : tags) {
            Map<String, Object> tagData = new HashMap<>();
            tagData.put(ProductMapping.FIELD_TAG_ID, t.getId());
            tagData.put(ProductMapping.FIELD_TAG_NAME, t.getName());
            tagDatas.add(tagData);
        }
        doc.put(ProductMapping.FIELD_TAGS, tagDatas);

        // 商品特征,用于商品推荐
        StringBuilder features = new StringBuilder(30);
        features.append(ProductMapping.FIELD_CATEGORY).append("=").append(product.getCategoryName()).append(Constants.SPACE);
        features.append(ProductMapping.FIELD_BRAND).append("=").append(product.getBrandName()).append(Constants.SPACE);
        if (CollectionUtils.isNotEmpty(attrValueList)) {
            for (ProductAttribute av : attrValueList) {
                features.append(av.getOptionId()).append("=").append(av.getValue()).append(Constants.SPACE);
            }
        }
        doc.put(ProductMapping.FIELD_FEATURES, features.toString());

        CreateDocumentCriteria createDocumentCriteria = new CreateDocumentCriteria();
        createDocumentCriteria.setIndex(ProductMapping.INDEX);
        createDocumentCriteria.setType(ProductMapping.TYPE);
        createDocumentCriteria.setDocument(doc);
        elasticTemplate.insert(createDocumentCriteria);

        this.createTagDocs(tags);
    }

    private void createTagDocs(List<Tag> tags) {
        if (CollectionUtils.isEmpty(tags)) {
            return;
        }
        List<IndexDocument> documentList = new ArrayList<>();
        // 创建tag文档
        for (Tag tag : tags) {
            IndexDocument doc = new IndexDocument(tag.getId());
            doc.put("id", tag.getId());
            doc.put("name", tag.getName());
            doc.put("related_count", tag.getRelatedCount());
            documentList.add(doc);
        }
        BulkInsertCriteria bulkInsertCriteria = new BulkInsertCriteria();
        bulkInsertCriteria.setIndex(TagMapping.INDEX);
        bulkInsertCriteria.setType(TagMapping.TYPE);
        bulkInsertCriteria.setDocList(documentList);
        elasticTemplate.bulkInsert(bulkInsertCriteria);
    }

}
